package cn.com.bonc.app;

import cn.com.bonc.conf.ConfigurationManager;
import cn.com.bonc.constant.Constants;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;

/**
 * Created by ${RQL} on 2019/1/7
 */
public class StructuredStreamingFile {
    public static void main(String[] args) {

        SparkSession spark = SparkSession.builder()
                .appName("StructuredStreamingFile")
                .master("local[4]")
                .getOrCreate();

        //从目录里面读取
        //text、csv、json、parquet四种形式
        //text格式如下
        Dataset<Row> lines = spark
                .readStream()
                .option("latestFirst","false")//是否先处理最新的新文件，当存在大量积压文件时有用（默认值：false）；
                .option("fileNameOnly","false")//是否基于以下方法检查新文件只有文件名而不是完整路径（默认值：false）,设为ture将出现很多文件被视为同一文件；
                //.text("/user/rql_test");//是hdfs目录
                .text(ConfigurationManager.getProperty(Constants.HDFS_SOURCE_PATH));

        //csv、json、parquet格式如下，用户需要通过schema()方法手动配置文件信息；
    /*    StructType userSchema = new StructType().add("name", "string").add("age", "integer");
        Dataset<Row> csvDF = spark
                .readStream()
                .option("sep",";")
                .schema(userSchema)
                .csv("/user/rql_test");*/

    /*
        Dataset<Row> rowDataset = lines
                .selectExpr("CAST(value AS STRING)")
                .as(Encoders.STRING())
                .filter((FilterFunction<String>) x -> NewUtil.isFormalData(x))
                .map((MapFunction<String, NewUserInfo>) x -> NewUtil.getUserInfo(x), Encoders.bean(NewUserInfo.class))
                .toDF();*/
        lines.printSchema();


        StreamingQuery query = lines.writeStream()
                .outputMode("append")
                .format("console")
                .option("truncate", "false")
                .start();

        try {
            query.awaitTermination();
        } catch (StreamingQueryException e) {
            e.printStackTrace();
        }
    }
}
